Add AgentRunner and LongRunnable to support long running agents

This changes HuginnScheduler and TwitterStream to use the new
AgentRunner

TwitterStream now uses the `twitter` gem to access the streaming API
which removes the `eventmachine` dependency (in production).

Make JabberAgent FormConfigurable and LongRunnable

Expect LongRunnable::Worker#run to be overridden

Dominik Sander 9 years ago
parent
commit
0f64e4e6b9

+ 3 - 2
Gemfile

@@ -25,7 +25,6 @@ gem "google-api-client", require: 'google/api_client'
25 25
 
26 26
 # Twitter Agents
27 27
 gem 'twitter', '~> 5.14.0' # Must to be loaded before cantino-twitter-stream.
28
-gem 'twitter-stream', github: 'cantino/twitter-stream', branch: 'huginn'
29 28
 gem 'omniauth-twitter'
30 29
 
31 30
 # Tumblr Agents
@@ -44,6 +43,9 @@ gem 'omniauth-37signals'          # BasecampAgent
44 43
 # gem 'omniauth-github'
45 44
 gem 'omniauth-wunderlist', github: 'wunderlist/omniauth-wunderlist', ref: 'd0910d0396107b9302aa1bc50e74bb140990ccb8'
46 45
 
46
+# Uncomment to use 'em_http' as FARADAY_HTTP_BACKEND
47
+# gem 'em-http-request', '~> 1.1.2'
48
+
47 49
 # Bundler <1.5 does not recognize :x64_mingw as a valid platform name.
48 50
 # Unfortunately, it can't self-update because it errors when encountering :x64_mingw.
49 51
 unless Gem::Version.new(Bundler::VERSION) >= Gem::Version.new('1.5.0')
@@ -62,7 +64,6 @@ gem 'delayed_job', '~> 4.0.0'
62 64
 gem 'delayed_job_active_record', :git => 'https://github.com/cantino/delayed_job_active_record', :branch => 'configurable-reserve-sql-strategy'
63 65
 gem 'devise', '~> 3.4.0'
64 66
 gem 'dotenv-rails', '~> 2.0.1'
65
-gem 'em-http-request', '~> 1.1.2'
66 67
 gem 'faraday', '~> 0.9.0'
67 68
 gem 'faraday_middleware'
68 69
 gem 'feed-normalizer'

+ 0 - 20
Gemfile.lock

@@ -1,14 +1,4 @@
1 1
 GIT
2
-  remote: git://github.com/cantino/twitter-stream.git
3
-  revision: f7e7edb0bae013bffabf3598e7147773d9fd370f
4
-  branch: huginn
5
-  specs:
6
-    twitter-stream (0.1.15)
7
-      eventmachine (~> 1.0.7)
8
-      http_parser.rb (~> 0.6.0)
9
-      simple_oauth (~> 0.3.0)
10
-
11
-GIT
12 2
   remote: git://github.com/cantino/weibo_2.git
13 3
   revision: 00e57d29d8252126014b038cd738b02e05e4cfc5
14 4
   branch: master
@@ -148,14 +138,6 @@ GEM
148 138
       hashie
149 139
       multi_json
150 140
       oauth
151
-    em-http-request (1.1.2)
152
-      addressable (>= 2.3.4)
153
-      cookiejar
154
-      em-socksify (>= 0.3)
155
-      eventmachine (>= 1.0.3)
156
-      http_parser.rb (>= 0.6.0)
157
-    em-socksify (0.3.0)
158
-      eventmachine (>= 1.0.0.beta.4)
159 141
     em-websocket (0.5.1)
160 142
       eventmachine (>= 0.12.9)
161 143
       http_parser.rb (~> 0.6.0)
@@ -536,7 +518,6 @@ DEPENDENCIES
536 518
   devise (~> 3.4.0)
537 519
   dotenv-rails (~> 2.0.1)
538 520
   dropbox-api
539
-  em-http-request (~> 1.1.2)
540 521
   faraday (~> 0.9.0)
541 522
   faraday_middleware
542 523
   feed-normalizer
@@ -598,7 +579,6 @@ DEPENDENCIES
598 579
   tumblr_client
599 580
   twilio-ruby (~> 3.11.5)
600 581
   twitter (~> 5.14.0)
601
-  twitter-stream!
602 582
   typhoeus (~> 0.6.3)
603 583
   tzinfo (>= 1.2.0)
604 584
   tzinfo-data

+ 113 - 0
app/concerns/long_runnable.rb

@@ -0,0 +1,113 @@
1
+=begin
2
+Usage Example:
3
+
4
+class Agents::ExampleAgent < Agent
5
+  include LongRunnable
6
+
7
+  # Optional
8
+  #   Override this method if you need to group multiple agents based on an API key,
9
+  #   or server they connect to.
10
+  #   Have a look at the TwitterStreamAgent for an example.
11
+  def self.setup_worker; end
12
+
13
+  class Worker < LongRunnable::Worker
14
+    # Optional
15
+    #   Called after initialization of the Worker class, use this method as an initializer.
16
+    def setup; end
17
+
18
+    # Required
19
+    #  Put your agent logic in here, it must not return. If it does your agent will be restarted.
20
+    def run; end
21
+
22
+    # Optional
23
+    #   Use this method the gracefully stop your agent but make sure the run method return, or
24
+    #   terminate the thread.
25
+    def stop; end
26
+  end
27
+end
28
+=end
29
+module LongRunnable
30
+  extend ActiveSupport::Concern
31
+
32
+  included do |base|
33
+    AgentRunner.register(base)
34
+  end
35
+
36
+  def start_worker?
37
+    true
38
+  end
39
+
40
+  def worker_id(config = nil)
41
+    "#{self.class.to_s}-#{id}-#{Digest::SHA1.hexdigest((config.presence || options).to_json)}"
42
+  end
43
+
44
+  module ClassMethods
45
+    def setup_worker
46
+      active.map do |agent|
47
+        next unless agent.start_worker?
48
+        self::Worker.new(id: agent.worker_id, agent: agent)
49
+      end.compact
50
+    end
51
+  end
52
+
53
+  class Worker
54
+    attr_reader :thread, :id, :agent, :config, :mutex
55
+
56
+    def initialize(options = {})
57
+      @id = options[:id]
58
+      @agent = options[:agent]
59
+      @config = options[:config]
60
+    end
61
+
62
+    def run
63
+      raise StandardError, 'Override LongRunnable::Worker#run in your agent Worker subclass.'
64
+    end
65
+
66
+    def run!
67
+      @thread = Thread.new do
68
+        begin
69
+          run
70
+        rescue SignalException, SystemExit
71
+          stop!
72
+        rescue StandardError => e
73
+          message = "Exception #{e.message}:\n#{e.backtrace.first(10).join("\n")}"
74
+          STDERR.puts "\n#{message}\n\n"
75
+          agent.error(message)
76
+        end
77
+      end
78
+    end
79
+
80
+    def setup!(scheduler, mutex)
81
+      @scheduler = scheduler
82
+      @mutex = mutex
83
+      setup if respond_to?(:setup)
84
+    end
85
+
86
+    def stop!
87
+      @scheduler.jobs(tag: id).each(&:unschedule)
88
+
89
+      if respond_to?(:stop)
90
+        stop
91
+      else
92
+        @thread.terminate
93
+      end
94
+    end
95
+
96
+    def every(*args, &blk)
97
+      schedule(:every, args, &blk)
98
+    end
99
+
100
+    def cron(*args, &blk)
101
+      schedule(:cron, args, &blk)
102
+    end
103
+
104
+    def boolify(value)
105
+      agent.send(:boolify, value)
106
+    end
107
+
108
+    private
109
+    def schedule(method, args, &blk)
110
+      @scheduler.send(method, *args, tag: id, &blk)
111
+    end
112
+  end
113
+end

+ 83 - 1
app/models/agents/jabber_agent.rb

@@ -1,7 +1,9 @@
1 1
 module Agents
2 2
   class JabberAgent < Agent
3
+    include LongRunnable
4
+    include FormConfigurable
5
+
3 6
     cannot_be_scheduled!
4
-    cannot_create_events!
5 7
 
6 8
     gem_dependency_check { defined?(Jabber) }
7 9
 
@@ -16,9 +18,22 @@ module Agents
16 18
       can contain any keys found in the source's payload, escaped using double curly braces.
17 19
       ex: `"News Story: {{title}}: {{url}}"`
18 20
 
21
+      When `connect_to_receiver` is set to true, the JabberAgent will emit an event for every message it receives.
22
+
19 23
       Have a look at the [Wiki](https://github.com/cantino/huginn/wiki/Formatting-Events-using-Liquid) to learn more about liquid templating.
20 24
     MD
21 25
 
26
+    event_description <<-MD
27
+      `event` will be set to either `on_join`, `on_leave`, `on_message`, `on_room_message` or `on_subject`
28
+
29
+          {
30
+            "event": "on_message",
31
+            "time": null,
32
+            "nick": "Dominik Sander",
33
+            "message": "Hello from huginn."
34
+          }
35
+    MD
36
+
22 37
     def default_options
23 38
       {
24 39
         'jabber_server'   => '127.0.0.1',
@@ -31,6 +46,15 @@ module Agents
31 46
       }
32 47
     end
33 48
 
49
+    form_configurable :jabber_server
50
+    form_configurable :jabber_port
51
+    form_configurable :jabber_sender
52
+    form_configurable :jabber_receiver
53
+    form_configurable :jabber_password
54
+    form_configurable :message, type: :text
55
+    form_configurable :connect_to_receiver, type: :boolean
56
+    form_configurable :expected_receive_period_in_days
57
+
34 58
     def working?
35 59
       last_receive_at && last_receive_at > interpolated['expected_receive_period_in_days'].to_i.days.ago && !recent_error_logs?
36 60
     end
@@ -50,6 +74,10 @@ module Agents
50 74
       client.send Jabber::Message::new(interpolated['jabber_receiver'], text).set_type(:chat)
51 75
     end
52 76
 
77
+    def start_worker?
78
+      boolify(interpolated[:connect_to_receiver])
79
+    end
80
+
53 81
     private
54 82
 
55 83
     def client
@@ -66,5 +94,59 @@ module Agents
66 94
     def body(event)
67 95
       interpolated(event)['message']
68 96
     end
97
+
98
+    class Worker < LongRunnable::Worker
99
+      IGNORE_MESSAGES_FOR=5
100
+
101
+      def setup
102
+        require 'xmpp4r/muc/helper/simplemucclient'
103
+      end
104
+
105
+      def run
106
+        @started_at = Time.now
107
+        @client = client
108
+        muc = Jabber::MUC::SimpleMUCClient.new(@client)
109
+
110
+        [:on_join, :on_leave, :on_message, :on_room_message, :on_subject].each do |event|
111
+          muc.__send__(event) do |*args|
112
+            message_handler(event, args)
113
+          end
114
+        end
115
+
116
+        muc.join(agent.interpolated['jabber_receiver'])
117
+
118
+        sleep(1) while @client.is_connected?
119
+      end
120
+
121
+      def message_handler(event, args)
122
+        return if Time.now - @started_at < IGNORE_MESSAGES_FOR
123
+
124
+        time, nick, message = normalize_args(event, args)
125
+
126
+        agent.create_event(payload: {event: event, time: time, nick: nick, message: message})
127
+      end
128
+
129
+      def stop
130
+        @client.close
131
+        @client.stop
132
+        thread.terminate
133
+      end
134
+
135
+      def client
136
+        agent.send(:client)
137
+      end
138
+
139
+      private
140
+      def normalize_args(event, args)
141
+        case event
142
+        when :on_join, :on_leave
143
+          [args[0], args[1]]
144
+        when :on_message, :on_subject
145
+          args
146
+        when :on_room_message
147
+          [args[0], nil, args[1]]
148
+        end
149
+      end
150
+    end
69 151
   end
70 152
 end

+ 87 - 0
app/models/agents/twitter_stream_agent.rb

@@ -1,6 +1,7 @@
1 1
 module Agents
2 2
   class TwitterStreamAgent < Agent
3 3
     include TwitterConcern
4
+    include LongRunnable
4 5
 
5 6
     cannot_receive_events!
6 7
 
@@ -122,5 +123,91 @@ module Agents
122 123
         end
123 124
       end
124 125
     end
126
+
127
+    def self.setup_worker
128
+      if Agents::TwitterStreamAgent.dependencies_missing?
129
+        STDERR.puts Agents::TwitterStreamAgent.twitter_dependencies_missing
130
+        STDERR.flush
131
+        return false
132
+      end
133
+
134
+      Agents::TwitterStreamAgent.active.group_by { |agent| agent.twitter_oauth_token }.map do |oauth_token, agents|
135
+        filter_to_agent_map = agents.map { |agent| agent.options[:filters] }.flatten.uniq.compact.map(&:strip).inject({}) { |m, f| m[f] = []; m }
136
+
137
+        agents.each do |agent|
138
+          agent.options[:filters].flatten.uniq.compact.map(&:strip).each do |filter|
139
+            filter_to_agent_map[filter] << agent
140
+          end
141
+        end
142
+
143
+        config_hash = filter_to_agent_map.map { |k, v| [k, v.map(&:id)] } << oauth_token
144
+
145
+        Worker.new(id: agents.first.worker_id(config_hash),
146
+                   config: {filter_to_agent_map: filter_to_agent_map},
147
+                   agent: agents.first)
148
+      end
149
+    end
150
+
151
+    class Worker < LongRunnable::Worker
152
+      RELOAD_TIMEOUT = 10.minutes
153
+      DUPLICATE_DETECTION_LENGTH = 1000
154
+      SEPARATOR = /[^\w_\-]+/
155
+
156
+      def setup
157
+        @timeout = 0
158
+      end
159
+
160
+      def run
161
+        recent_tweets = []
162
+        filter_to_agent_map = @config[:filter_to_agent_map]
163
+
164
+        stream!(filter_to_agent_map.keys, @agent) do |status|
165
+          if status["retweeted_status"].present? && status["retweeted_status"].is_a?(Hash)
166
+            puts "Skipping retweet: #{status["text"]}"
167
+          elsif recent_tweets.include?(status["id_str"])
168
+            puts "Skipping duplicate tweet: #{status["text"]}"
169
+          else
170
+            recent_tweets << status["id_str"]
171
+            recent_tweets.shift if recent_tweets.length > DUPLICATE_DETECTION_LENGTH
172
+            puts status["text"]
173
+            filter_to_agent_map.keys.each do |filter|
174
+              if (filter.downcase.split(SEPARATOR) - status["text"].downcase.split(SEPARATOR)).reject(&:empty?) == [] # Hacky McHackerson
175
+                filter_to_agent_map[filter].each do |agent|
176
+                  puts " -> #{agent.name}"
177
+                  agent.process_tweet(filter, status)
178
+                end
179
+              end
180
+            end
181
+          end
182
+        end
183
+      end
184
+
185
+      private
186
+      def stream!(filters, agent, &block)
187
+        filters = filters.map(&:downcase).uniq
188
+
189
+        method = (filters && filters.length > 0) ? [:filter, track: filters.map {|f| CGI::escape(f) }.join(",")] : [:sample]
190
+        client.send(*method) do |tweet|
191
+          @timeout = 0
192
+          return unless tweet.class == Twitter::Tweet
193
+          status = ActiveSupport::HashWithIndifferentAccess.new(tweet.to_h)
194
+          status['text'] = status['text'].gsub(/&lt;/, "<").gsub(/&gt;/, ">").gsub(/[\t\n\r]/, '  ')
195
+          yield status
196
+        end
197
+      rescue Twitter::Error => e
198
+        @timeout += 60
199
+        puts "Twitter raised '#{e.class}', sleeping for #{@timeout} seconds"
200
+        sleep @timeout
201
+      end
202
+
203
+      def client
204
+        @client ||= Twitter::Streaming::Client.new do |config|
205
+          config.consumer_key        = @agent.twitter_consumer_key
206
+          config.consumer_secret     = @agent.twitter_consumer_secret
207
+          config.access_token        = @agent.twitter_oauth_token
208
+          config.access_token_secret = @agent.twitter_oauth_token_secret
209
+        end
210
+      end
211
+    end
125 212
   end
126 213
 end

+ 19 - 0
bin/agent_runner.rb

@@ -0,0 +1,19 @@
1
+#!/usr/bin/env ruby
2
+
3
+# This process is used to maintain Huginn's upkeep behavior, automatically running scheduled Agents and
4
+# periodically propagating and expiring Events. It also running TwitterStreamAgents and Agents that support long running
5
+# background jobs.
6
+
7
+Dotenv.load if Rails.env == 'development'
8
+
9
+require 'agent_runner'
10
+
11
+unless defined?(Rails)
12
+  puts
13
+  puts "Please run me with rails runner, for example:"
14
+  puts "  RAILS_ENV=production bundle exec rails runner bin/agent_runner.rb"
15
+  puts
16
+  exit 1
17
+end
18
+
19
+AgentRunner.new(except: DelayedJobWorker).run

+ 5 - 2
bin/schedule.rb

@@ -3,6 +3,10 @@
3 3
 # This process is used to maintain Huginn's upkeep behavior, automatically running scheduled Agents and
4 4
 # periodically propagating and expiring Events.  It's typically run via foreman and the included Procfile.
5 5
 
6
+Dotenv.load if Rails.env == 'development'
7
+
8
+require 'agent_runner'
9
+
6 10
 unless defined?(Rails)
7 11
   puts
8 12
   puts "Please run me with rails runner, for example:"
@@ -11,5 +15,4 @@ unless defined?(Rails)
11 15
   exit 1
12 16
 end
13 17
 
14
-scheduler = HuginnScheduler.new(frequency: ENV['SCHEDULER_FREQUENCY'].presence || 0.3)
15
-scheduler.run!
18
+AgentRunner.new(only: HuginnScheduler).run

+ 14 - 56
bin/threaded.rb

@@ -1,65 +1,23 @@
1
-require 'thread'
2
-require 'huginn_scheduler'
3
-require 'twitter_stream'
1
+#!/usr/bin/env ruby
4 2
 
5
-Rails.configuration.cache_classes = true
3
+Dotenv.load if Rails.env == 'development'
6 4
 
7
-STDOUT.sync = true
8
-STDERR.sync = true
5
+require 'agent_runner'
9 6
 
10
-def stop
11
-  puts 'Exiting...'
12
-  @scheduler.stop
13
-  @dj.stop
14
-  @stream.stop
7
+unless defined?(Rails)
8
+  puts
9
+  puts "Please run me with rails runner, for example:"
10
+  puts "  RAILS_ENV=production bundle exec rails runner bin/threaded.rb"
11
+  puts
12
+  exit 1
15 13
 end
16 14
 
17
-def safely(&block)
18
-  begin
19
-    yield block
20
-  rescue StandardError => e
21
-    STDERR.puts "\nException #{e.message}:\n#{e.backtrace.join("\n")}\n\n"
22
-    STDERR.puts "Terminating myself ..."
23
-    STDERR.flush
24
-    stop
25
-  end
26
-end
27
-
28
-threads = []
29
-threads << Thread.new do
30
-  safely do
31
-    @stream = TwitterStream.new
32
-    @stream.run
33
-    puts "Twitter stream stopped ..."
34
-  end
35
-end
36
-
37
-threads << Thread.new do
38
-  safely do
39
-    @scheduler = HuginnScheduler.new(frequency: ENV['SCHEDULER_FREQUENCY'].presence || 0.3)
40
-    @scheduler.run!
41
-    puts "Scheduler stopped ..."
42
-  end
43
-end
44
-
45
-threads << Thread.new do
46
-  safely do
47
-    require 'delayed/command'
48
-    @dj = Delayed::Worker.new
49
-    @dj.start
50
-    puts "Delayed job stopped ..."
51
-  end
52
-end
15
+agent_runner = AgentRunner.new
53 16
 
54 17
 # We need to wait a bit to let delayed_job set it's traps so we can override them
55
-sleep 0.5
56
-
57
-trap('TERM') do
58
-  stop
59
-end
60
-
61
-trap('INT') do
62
-  stop
18
+Thread.new do
19
+  sleep 5
20
+  agent_runner.set_traps
63 21
 end
64 22
 
65
-threads.collect { |t| t.join }
23
+agent_runner.run

+ 5 - 1
bin/twitter_stream.rb

@@ -4,6 +4,10 @@
4 4
 # new or changed TwitterStreamAgents and starts to follow the stream for them.  It is typically run by foreman via
5 5
 # the included Procfile.
6 6
 
7
+Dotenv.load if Rails.env == 'development'
8
+
9
+require 'agent_runner'
10
+
7 11
 unless defined?(Rails)
8 12
   puts
9 13
   puts "Please run me with rails runner, for example:"
@@ -12,4 +16,4 @@ unless defined?(Rails)
12 16
   exit 1
13 17
 end
14 18
 
15
-TwitterStream.new.run
19
+AgentRunner.new(only: Agents::TwitterStreamAgent).run

+ 116 - 0
lib/agent_runner.rb

@@ -0,0 +1,116 @@
1
+require 'cgi'
2
+require 'json'
3
+require 'rufus-scheduler'
4
+require 'pp'
5
+require 'twitter'
6
+
7
+Rails.configuration.cache_classes = true
8
+
9
+class AgentRunner
10
+  @@agents = []
11
+
12
+  def initialize(options = {})
13
+    @workers = {}
14
+    @signal_queue = []
15
+    @options = options
16
+    @options[:only] = [@options[:only]].flatten if @options[:only]
17
+    @options[:except] = [@options[:except]].flatten if @options[:except]
18
+    @mutex = Mutex.new
19
+    @scheduler = Rufus::Scheduler.new(frequency: ENV['SCHEDULER_FREQUENCY'].presence || 0.3)
20
+
21
+    @scheduler.every 5 do
22
+      restart_dead_workers if @running
23
+    end
24
+
25
+    @scheduler.every 60 do
26
+      run_workers if @running
27
+    end
28
+
29
+    set_traps
30
+  end
31
+
32
+  def stop
33
+    puts "Stopping AgentRunner..."
34
+    @running = false
35
+    @workers.each_pair do |_, w| w.stop! end
36
+    @scheduler.stop
37
+  end
38
+
39
+  def run
40
+    @running = true
41
+    run_workers
42
+
43
+    while @running
44
+      #puts "r"
45
+      if signal = @signal_queue.shift
46
+        handle_signal(signal)
47
+      end
48
+      sleep 0.25
49
+    end
50
+    @scheduler.join
51
+  end
52
+
53
+  def set_traps
54
+    %w(INT TERM QUIT).each do |signal|
55
+      Signal.trap(signal) { @signal_queue << signal }
56
+    end
57
+  end
58
+
59
+  def self.register(agent)
60
+    @@agents << agent unless @@agents.include?(agent)
61
+  end
62
+
63
+  private
64
+  def run_workers
65
+    workers             = load_workers
66
+    new_worker_ids      = workers.keys
67
+    current_worker_ids  = @workers.keys
68
+
69
+    (current_worker_ids - new_worker_ids).each do |outdated_worker_id|
70
+      puts "Killing #{outdated_worker_id}"
71
+      @workers[outdated_worker_id].stop!
72
+      @workers.delete(outdated_worker_id)
73
+    end
74
+
75
+    (new_worker_ids - current_worker_ids).each do |new_worker_id|
76
+      puts "Starting #{new_worker_id}"
77
+      @workers[new_worker_id] = workers[new_worker_id]
78
+      @workers[new_worker_id].setup!(@scheduler, @mutex)
79
+      @workers[new_worker_id].run!
80
+    end
81
+  end
82
+
83
+  def load_workers
84
+    workers = {}
85
+    @@agents.each do |klass|
86
+      next if @options[:only] && !@options[:only].include?(klass)
87
+      next if @options[:except] && @options[:except].include?(klass)
88
+
89
+      (klass.setup_worker || []).each do |agent_worker|
90
+        workers[agent_worker.id] = agent_worker
91
+      end
92
+    end
93
+    workers
94
+  end
95
+
96
+  def restart_dead_workers
97
+    @workers.each_pair do |id, worker|
98
+      if worker.thread && !worker.thread.alive?
99
+        puts "Restarting #{id.to_s}"
100
+        @workers[id].run!
101
+      end
102
+    end
103
+  end
104
+
105
+  def handle_signal(signal)
106
+    case signal
107
+    when 'INT', 'TERM', 'QUIT'
108
+      stop
109
+    end
110
+  end
111
+end
112
+
113
+require 'agents/twitter_stream_agent'
114
+require 'agents/jabber_agent'
115
+require 'huginn_scheduler'
116
+require 'delayed_job_worker'

+ 16 - 0
lib/delayed_job_worker.rb

@@ -0,0 +1,16 @@
1
+class DelayedJobWorker < LongRunnable::Worker
2
+  include LongRunnable
3
+
4
+  def run
5
+    @dj = Delayed::Worker.new
6
+    @dj.start
7
+  end
8
+
9
+  def stop
10
+    @dj.stop
11
+  end
12
+
13
+  def self.setup_worker
14
+    [new(id: self.to_s)]
15
+  end
16
+end

+ 20 - 22
lib/huginn_scheduler.rb

@@ -92,58 +92,56 @@ class Rufus::Scheduler
92 92
   end
93 93
 end
94 94
 
95
-class HuginnScheduler
96
-  FAILED_JOBS_TO_KEEP = 100
97
-  attr_accessor :mutex
98
-
99
-  def initialize(options = {})
100
-    @rufus_scheduler = Rufus::Scheduler.new(options)
101
-    self.mutex = Mutex.new
102
-  end
95
+class HuginnScheduler < LongRunnable::Worker
96
+  include LongRunnable
103 97
 
104
-  def stop
105
-    @rufus_scheduler.stop
106
-  end
98
+  FAILED_JOBS_TO_KEEP = 100
107 99
 
108
-  def run!
100
+  def setup
109 101
     tzinfo_friendly_timezone = ActiveSupport::TimeZone::MAPPING[ENV['TIMEZONE'].presence || "Pacific Time (US & Canada)"]
110 102
 
111 103
     # Schedule event propagation.
112
-    @rufus_scheduler.every '1m' do
104
+    every '1m' do
113 105
       propagate!
114 106
     end
115 107
 
116 108
     # Schedule event cleanup.
117
-    @rufus_scheduler.every ENV['EVENT_EXPIRATION_CHECK'].presence || '6h' do
109
+    every ENV['EVENT_EXPIRATION_CHECK'].presence || '6h' do
118 110
       cleanup_expired_events!
119 111
     end
120 112
 
121 113
     # Schedule failed job cleanup.
122
-    @rufus_scheduler.every '1h' do
114
+    every '1h' do
123 115
       cleanup_failed_jobs!
124 116
     end
125 117
 
126 118
     # Schedule repeating events.
127 119
     %w[1m 2m 5m 10m 30m 1h 2h 5h 12h 1d 2d 7d].each do |schedule|
128
-      @rufus_scheduler.every schedule do
120
+      every schedule do
129 121
         run_schedule "every_#{schedule}"
130 122
       end
131 123
     end
132 124
 
133 125
     # Schedule events for specific times.
134 126
     24.times do |hour|
135
-      @rufus_scheduler.cron "0 #{hour} * * * " + tzinfo_friendly_timezone do
127
+      cron "0 #{hour} * * * " + tzinfo_friendly_timezone do
136 128
         run_schedule hour_to_schedule_name(hour)
137 129
       end
138 130
     end
139 131
 
140 132
     # Schedule Scheduler Agents
141 133
 
142
-    @rufus_scheduler.every '1m' do
143
-      @rufus_scheduler.schedule_scheduler_agents
134
+    every '1m' do
135
+      @scheduler.schedule_scheduler_agents
144 136
     end
137
+  end
138
+
139
+  def run
140
+    @scheduler.join
141
+  end
145 142
 
146
-    @rufus_scheduler.join
143
+  def self.setup_worker
144
+    [new(id: self.to_s)]
147 145
   end
148 146
 
149 147
   private
@@ -187,8 +185,8 @@ class HuginnScheduler
187 185
   end
188 186
 
189 187
   def with_mutex
190
-    ActiveRecord::Base.connection_pool.with_connection do
191
-      mutex.synchronize do
188
+    mutex.synchronize do
189
+      ActiveRecord::Base.connection_pool.with_connection do
192 190
         yield
193 191
       end
194 192
     end

+ 0 - 134
lib/twitter_stream.rb

@@ -1,134 +0,0 @@
1
-require 'cgi'
2
-require 'json'
3
-require 'em-http-request'
4
-require 'pp'
5
-
6
-class TwitterStream
7
-  def initialize
8
-    @running = true
9
-  end
10
-
11
-  def stop
12
-    @running = false
13
-  end
14
-
15
-  def stream!(filters, agent, &block)
16
-    filters = filters.map(&:downcase).uniq
17
-
18
-    stream = Twitter::JSONStream.connect(
19
-      :path    => "/1/statuses/#{(filters && filters.length > 0) ? 'filter' : 'sample'}.json#{"?track=#{filters.map {|f| CGI::escape(f) }.join(",")}" if filters && filters.length > 0}",
20
-      :ssl     => true,
21
-      :oauth   => {
22
-        :consumer_key    => agent.twitter_consumer_key,
23
-        :consumer_secret => agent.twitter_consumer_secret,
24
-        :access_key      => agent.twitter_oauth_token,
25
-        :access_secret   => agent.twitter_oauth_token_secret
26
-      }
27
-    )
28
-
29
-    stream.each_item do |status|
30
-      status = JSON.parse(status) if status.is_a?(String)
31
-      next unless status
32
-      next if status.has_key?('delete')
33
-      next unless status['text']
34
-      status['text'] = status['text'].gsub(/&lt;/, "<").gsub(/&gt;/, ">").gsub(/[\t\n\r]/, '  ')
35
-      block.call(status)
36
-    end
37
-
38
-    stream.on_error do |message|
39
-      STDERR.puts " --> Twitter error: #{message} <--"
40
-    end
41
-
42
-    stream.on_no_data do |message|
43
-      STDERR.puts " --> Got no data for awhile; trying to reconnect."
44
-      EventMachine::stop_event_loop
45
-    end
46
-
47
-    stream.on_max_reconnects do |timeout, retries|
48
-      STDERR.puts " --> Oops, tried too many times! <--"
49
-      EventMachine::stop_event_loop
50
-    end
51
-  end
52
-
53
-  def load_and_run(agents)
54
-    agents.group_by { |agent| agent.twitter_oauth_token }.each do |oauth_token, agents|
55
-      filter_to_agent_map = agents.map { |agent| agent.options[:filters] }.flatten.uniq.compact.map(&:strip).inject({}) { |m, f| m[f] = []; m }
56
-
57
-      agents.each do |agent|
58
-        agent.options[:filters].flatten.uniq.compact.map(&:strip).each do |filter|
59
-          filter_to_agent_map[filter] << agent
60
-        end
61
-      end
62
-
63
-      recent_tweets = []
64
-
65
-      stream!(filter_to_agent_map.keys, agents.first) do |status|
66
-        if status["retweeted_status"].present? && status["retweeted_status"].is_a?(Hash)
67
-          puts "Skipping retweet: #{status["text"]}"
68
-        elsif recent_tweets.include?(status["id_str"])
69
-          puts "Skipping duplicate tweet: #{status["text"]}"
70
-        else
71
-          recent_tweets << status["id_str"]
72
-          recent_tweets.shift if recent_tweets.length > DUPLICATE_DETECTION_LENGTH
73
-          puts status["text"]
74
-          filter_to_agent_map.keys.each do |filter|
75
-            if (filter.downcase.split(SEPARATOR) - status["text"].downcase.split(SEPARATOR)).reject(&:empty?) == [] # Hacky McHackerson
76
-              filter_to_agent_map[filter].each do |agent|
77
-                puts " -> #{agent.name}"
78
-                agent.process_tweet(filter, status)
79
-              end
80
-            end
81
-          end
82
-        end
83
-      end
84
-    end
85
-  end
86
-
87
-  RELOAD_TIMEOUT = 10.minutes
88
-  DUPLICATE_DETECTION_LENGTH = 1000
89
-  SEPARATOR = /[^\w_\-]+/
90
-
91
-  def run
92
-    if Agents::TwitterStreamAgent.dependencies_missing?
93
-      STDERR.puts Agents::TwitterStreamAgent.twitter_dependencies_missing
94
-      STDERR.flush
95
-      return
96
-    end
97
-
98
-    require 'twitter/json_stream'
99
-
100
-    while @running
101
-      begin
102
-        agents = Agents::TwitterStreamAgent.active.all
103
-
104
-        EventMachine::run do
105
-          EventMachine.add_periodic_timer(1) {
106
-            EventMachine::stop_event_loop if !@running
107
-          }
108
-
109
-          EventMachine.add_periodic_timer(RELOAD_TIMEOUT) {
110
-            puts "Reloading EventMachine and all Agents..."
111
-            EventMachine::stop_event_loop
112
-          }
113
-
114
-          if agents.length == 0
115
-            puts "No agents found.  Will look again in a minute."
116
-            EventMachine.add_timer(60) {
117
-              EventMachine::stop_event_loop
118
-            }
119
-          else
120
-            puts "Found #{agents.length} agent(s).  Loading them now..."
121
-            load_and_run agents
122
-          end
123
-        end
124
-      rescue SignalException, SystemExit
125
-        @running = false
126
-        EventMachine::stop_event_loop if EventMachine.reactor_running?
127
-      rescue StandardError => e
128
-        STDERR.puts "\nException #{e.message}:\n#{e.backtrace.join("\n")}\n\n"
129
-        STDERR.puts "Waiting for a couple of minutes..."
130
-        sleep 120
131
-      end
132
-    end
133
-  end
134
-end

+ 88 - 0
spec/concerns/long_runnable_spec.rb

@@ -0,0 +1,88 @@
1
+require 'spec_helper'
2
+
3
+describe LongRunnable do
4
+  class LongRunnableAgent < Agent
5
+    include LongRunnable
6
+
7
+    def default_options
8
+      {test: 'test'}
9
+    end
10
+  end
11
+
12
+  before(:all) do
13
+    @agent = LongRunnableAgent.new
14
+  end
15
+
16
+  it "start_worker? defaults to true" do
17
+    expect(@agent.start_worker?).to be_truthy
18
+  end
19
+
20
+  it "should build the worker_id" do
21
+    expect(@agent.worker_id).to eq('LongRunnableAgent--bf21a9e8fbc5a3846fb05b4fa0859e0917b2202f')
22
+  end
23
+
24
+  context "#setup_worker" do
25
+    it "returns active agent workers" do
26
+      mock(LongRunnableAgent).active { [@agent] }
27
+      workers = LongRunnableAgent.setup_worker
28
+      expect(workers.length).to eq(1)
29
+      expect(workers.first).to be_a(LongRunnableAgent::Worker)
30
+      expect(workers.first.agent).to eq(@agent)
31
+    end
32
+
33
+    it "returns an empty array when no agent is active" do
34
+      mock(LongRunnableAgent).active { [] }
35
+      workers = LongRunnableAgent.setup_worker
36
+      expect(workers.length).to eq(0)
37
+    end
38
+  end
39
+
40
+  describe LongRunnable::Worker do
41
+    before(:each) do
42
+      @agent = Object.new
43
+      @worker = LongRunnable::Worker.new(agent: @agent)
44
+      @worker.setup!(Rufus::Scheduler.new, Mutex.new)
45
+    end
46
+
47
+    it "calls boolify of the agent" do
48
+      mock(@agent).boolify('true') { true }
49
+      expect(@worker.boolify('true')).to be_truthy
50
+    end
51
+
52
+    it "expects run to be overriden" do
53
+      expect { @worker.run }.to raise_error(StandardError)
54
+    end
55
+
56
+    context "#run!" do
57
+      it "runs the agent worker" do
58
+        mock(@worker).run
59
+        @worker.run!.join
60
+      end
61
+
62
+      it "stops when rescueing a SystemExit" do
63
+        mock(@worker).run { raise SystemExit }
64
+        mock(@worker).stop!
65
+        @worker.run!.join
66
+      end
67
+
68
+      it "creates an agent log entry for a generic exception" do
69
+        stub(STDERR).puts
70
+        mock(@worker).run { raise "woups" }
71
+        mock(@agent).error(/woups/)
72
+        @worker.run!.join
73
+      end
74
+    end
75
+
76
+    context "#stop!" do
77
+      it "terminates the thread" do
78
+        mock(@worker.thread).terminate
79
+        @worker.stop!
80
+      end
81
+
82
+      it "gracefully stops the worker" do
83
+        mock(@worker).stop
84
+        @worker.stop!
85
+      end
86
+    end
87
+  end
88
+end

+ 102 - 0
spec/lib/agent_runner_spec.rb

@@ -0,0 +1,102 @@
1
+require 'spec_helper'
2
+
3
+describe AgentRunner do
4
+  context "without traps" do
5
+    before do
6
+      stub.instance_of(Rufus::Scheduler).every
7
+      stub.instance_of(AgentRunner).set_traps
8
+      @agent_runner = AgentRunner.new
9
+    end
10
+
11
+    context "#run" do
12
+      before do
13
+        mock(@agent_runner).run_workers
14
+        mock.instance_of(IO).puts('Stopping AgentRunner...')
15
+      end
16
+
17
+      it "runs until stop is called" do
18
+        mock.instance_of(Rufus::Scheduler).join
19
+        Thread.new { while @agent_runner.instance_variable_get(:@running) != false do sleep 0.1; @agent_runner.stop end }
20
+        @agent_runner.run
21
+      end
22
+
23
+      it "handles signals" do
24
+        @agent_runner.instance_variable_set(:@signal_queue, ['TERM'])
25
+        @agent_runner.run
26
+      end
27
+    end
28
+
29
+    context "#load_workers" do
30
+      before do
31
+        AgentRunner.class_variable_set(:@@agents, [HuginnScheduler, DelayedJobWorker])
32
+      end
33
+      it "loads all workers" do
34
+        workers = @agent_runner.send(:load_workers)
35
+        expect(workers).to be_a(Hash)
36
+        expect(workers.keys).to eq(['HuginnScheduler', 'DelayedJobWorker'])
37
+      end
38
+
39
+      it "loads only the workers specified in the :only option" do
40
+        @agent_runner = AgentRunner.new(only: HuginnScheduler)
41
+        workers = @agent_runner.send(:load_workers)
42
+        expect(workers.keys).to eq(['HuginnScheduler'])
43
+      end
44
+
45
+      it "does not load workers specified in the :except option" do
46
+        @agent_runner = AgentRunner.new(except: HuginnScheduler)
47
+        workers = @agent_runner.send(:load_workers)
48
+        expect(workers.keys).to eq(['DelayedJobWorker'])
49
+      end
50
+    end
51
+
52
+    context "running workers" do
53
+      before do
54
+        AgentRunner.class_variable_set(:@@agents, [HuginnScheduler, DelayedJobWorker])
55
+        stub.instance_of(IO).puts
56
+        stub.instance_of(LongRunnable::Worker).setup!
57
+      end
58
+
59
+      context "#run_workers" do
60
+
61
+        it "runs all the workers" do
62
+          mock.instance_of(HuginnScheduler).run!
63
+          mock.instance_of(DelayedJobWorker).run!
64
+          @agent_runner.send(:run_workers)
65
+        end
66
+
67
+        it "kills no long active workers" do
68
+          mock.instance_of(HuginnScheduler).run!
69
+          mock.instance_of(DelayedJobWorker).run!
70
+          @agent_runner.send(:run_workers)
71
+          AgentRunner.class_variable_set(:@@agents, [DelayedJobWorker])
72
+          mock.instance_of(HuginnScheduler).stop!
73
+          @agent_runner.send(:run_workers)
74
+        end
75
+      end
76
+
77
+      context "#restart_dead_workers" do
78
+        before do
79
+          mock.instance_of(HuginnScheduler).run!
80
+          mock.instance_of(DelayedJobWorker).run!
81
+          @agent_runner.send(:run_workers)
82
+
83
+        end
84
+        it "restarts dead workers" do
85
+          stub.instance_of(HuginnScheduler).thread { OpenStruct.new(alive?: false) }
86
+          mock.instance_of(HuginnScheduler).run!
87
+          @agent_runner.send(:restart_dead_workers)
88
+        end
89
+      end
90
+    end
91
+  end
92
+
93
+  context "#set_traps" do
94
+    it "sets traps for INT TERM and QUIT" do
95
+      agent_runner = AgentRunner.new
96
+      mock(Signal).trap('INT')
97
+      mock(Signal).trap('TERM')
98
+      mock(Signal).trap('QUIT')
99
+      agent_runner.set_traps
100
+    end
101
+  end
102
+end

+ 28 - 0
spec/lib/delayed_job_worker_spec.rb

@@ -0,0 +1,28 @@
1
+require 'spec_helper'
2
+
3
+describe DelayedJobWorker do
4
+  before do
5
+    @djw = DelayedJobWorker.new
6
+  end
7
+
8
+  it "should run" do
9
+    mock.instance_of(Delayed::Worker).start
10
+    @djw.run
11
+  end
12
+
13
+  it "should stop" do
14
+    mock.instance_of(Delayed::Worker).start
15
+    mock.instance_of(Delayed::Worker).stop
16
+    @djw.run
17
+    @djw.stop
18
+  end
19
+
20
+  context "#setup_worker" do
21
+    it "should return an array with an instance of itself" do
22
+      workers = DelayedJobWorker.setup_worker
23
+      expect(workers).to be_a(Array)
24
+      expect(workers.first).to be_a(DelayedJobWorker)
25
+      expect(workers.first.id).to eq('DelayedJobWorker')
26
+    end
27
+  end
28
+end

+ 15 - 7
spec/lib/huginn_scheduler_spec.rb

@@ -4,17 +4,16 @@ require 'huginn_scheduler'
4 4
 describe HuginnScheduler do
5 5
   before(:each) do
6 6
     @scheduler = HuginnScheduler.new
7
+    stub(@scheduler).setup {}
8
+    @scheduler.setup!(Rufus::Scheduler.new, Mutex.new)
7 9
     stub
8 10
   end
9 11
 
10
-  it "should stop the scheduler" do
11
-    mock.instance_of(Rufus::Scheduler).stop
12
-    @scheduler.stop
13
-  end
14
-
15 12
   it "schould register the schedules with the rufus scheduler and run" do
16 13
     mock.instance_of(Rufus::Scheduler).join
17
-    @scheduler.run!
14
+    scheduler = HuginnScheduler.new
15
+    scheduler.setup!(Rufus::Scheduler.new, Mutex.new)
16
+    scheduler.run
18 17
   end
19 18
 
20 19
   it "should run scheduled agents" do
@@ -53,7 +52,7 @@ describe HuginnScheduler do
53 52
     end
54 53
   end
55 54
 
56
-  describe "cleanup_failed_jobs!" do
55
+  describe "cleanup_failed_jobs!", focus: true do
57 56
     before do
58 57
       3.times do |i|
59 58
         Delayed::Job.create(failed_at: Time.now - i.minutes)
@@ -75,6 +74,15 @@ describe HuginnScheduler do
75 74
       ENV['FAILED_JOBS_TO_KEEP'] = old
76 75
     end
77 76
   end
77
+
78
+  context "#setup_worker" do
79
+    it "should return an array with an instance of itself" do
80
+      workers = HuginnScheduler.setup_worker
81
+      expect(workers).to be_a(Array)
82
+      expect(workers.first).to be_a(HuginnScheduler)
83
+      expect(workers.first.id).to eq('HuginnScheduler')
84
+    end
85
+  end
78 86
 end
79 87
 
80 88
 describe Rufus::Scheduler do

+ 73 - 0
spec/models/agents/jabber_agent_spec.rb

@@ -44,6 +44,17 @@ describe Agents::JabberAgent do
44 44
     end
45 45
   end
46 46
 
47
+  context "#start_worker?" do
48
+    it "starts when connect_to_receiver is truthy" do
49
+      agent.options[:connect_to_receiver] = 'true'
50
+      expect(agent.start_worker?).to be_truthy
51
+    end
52
+
53
+    it "does not starts when connect_to_receiver is not truthy" do
54
+      expect(agent.start_worker?).to be_falsy
55
+    end
56
+  end
57
+
47 58
   describe "validation" do
48 59
     before do
49 60
       expect(agent).to be_valid
@@ -78,4 +89,66 @@ describe Agents::JabberAgent do
78 89
                        'Warning! Another Weather Alert! - http://www.weather.com/we-are-screwed'])
79 90
     end
80 91
   end
92
+
93
+  describe Agents::JabberAgent::Worker do
94
+    before(:each) do
95
+      @worker = Agents::JabberAgent::Worker.new(agent: agent)
96
+      @worker.setup
97
+      stub.any_instance_of(Jabber::Client).connect
98
+      stub.any_instance_of(Jabber::Client).auth
99
+    end
100
+
101
+    it "runs" do
102
+      agent.options[:jabber_receiver] = 'someJID'
103
+      mock.any_instance_of(Jabber::MUC::SimpleMUCClient).join('someJID')
104
+      @worker.run
105
+    end
106
+
107
+    it "stops" do
108
+      @worker.instance_variable_set(:@client, @worker.client)
109
+      mock.any_instance_of(Jabber::Client).close
110
+      mock.any_instance_of(Jabber::Client).stop
111
+      mock(@worker).thread { mock!.terminate }
112
+      @worker.stop
113
+    end
114
+
115
+    context "#message_handler" do
116
+      it "it ignores messages for the first seconds" do
117
+        @worker.instance_variable_set(:@started_at, Time.now)
118
+        expect { @worker.message_handler(:on_message, [123456, 'nick', 'hello']) }
119
+          .to change { agent.events.count }.by(0)
120
+      end
121
+
122
+      it "creates events" do
123
+        @worker.instance_variable_set(:@started_at, Time.now - 10.seconds)
124
+        expect { @worker.message_handler(:on_message, [123456, 'nick', 'hello']) }
125
+          .to change { agent.events.count }.by(1)
126
+        event = agent.events.last
127
+        expect(event.payload).to eq({'event' => 'on_message', 'time' => 123456, 'nick' => 'nick', 'message' => 'hello'})
128
+      end
129
+    end
130
+
131
+    context "#normalize_args" do
132
+      it "handles :on_join and :on_leave" do
133
+        time, nick, message = @worker.send(:normalize_args, :on_join, [123456, 'nick'])
134
+        expect(time).to eq(123456)
135
+        expect(nick).to eq('nick')
136
+        expect(message).to be_nil
137
+      end
138
+
139
+      it "handles :on_message and :on_leave" do
140
+        time, nick, message = @worker.send(:normalize_args, :on_message, [123456, 'nick', 'hello'])
141
+        expect(time).to eq(123456)
142
+        expect(nick).to eq('nick')
143
+        expect(message).to eq('hello')
144
+      end
145
+
146
+      it "handles :on_room_message" do
147
+        time, nick, message = @worker.send(:normalize_args, :on_room_message, [123456, 'hello'])
148
+        expect(time).to eq(123456)
149
+        expect(nick).to be_nil
150
+        expect(message).to eq('hello')
151
+      end
152
+    end
153
+  end
81 154
 end

+ 124 - 0
spec/models/agents/twitter_stream_agent_spec.rb

@@ -125,4 +125,128 @@ describe Agents::TwitterStreamAgent do
125 125
       end
126 126
     end
127 127
   end
128
+
129
+  context "#setup_worker" do
130
+    it "ensures the dependencies are available" do
131
+      mock(STDERR).puts(Agents::TwitterStreamAgent.twitter_dependencies_missing)
132
+      mock(Agents::TwitterStreamAgent).dependencies_missing? { true }
133
+      expect(Agents::TwitterStreamAgent.setup_worker).to eq(false)
134
+    end
135
+
136
+    it "returns now workers if no agent is active" do
137
+      mock(Agents::TwitterStreamAgent).active { [] }
138
+      expect(Agents::TwitterStreamAgent.setup_worker).to eq([])
139
+    end
140
+
141
+    it "returns a worker for an active agent" do
142
+      mock(Agents::TwitterStreamAgent).active { [@agent] }
143
+      workers = Agents::TwitterStreamAgent.setup_worker
144
+      expect(workers).to be_a(Array)
145
+      expect(workers.length).to eq(1)
146
+      expect(workers.first).to be_a(Agents::TwitterStreamAgent::Worker)
147
+      filter_to_agent_map = workers.first.config[:filter_to_agent_map]
148
+      expect(filter_to_agent_map.keys).to eq(['keyword1', 'keyword2'])
149
+      expect(filter_to_agent_map.values).to eq([[@agent], [@agent]])
150
+    end
151
+
152
+    it "correctly maps keywords to agents" do
153
+      agent2 = @agent.dup
154
+      agent2.id = 123455
155
+      agent2.options[:filters] = ['agent2']
156
+      mock(Agents::TwitterStreamAgent).active { [@agent, agent2] }
157
+
158
+      workers = Agents::TwitterStreamAgent.setup_worker
159
+      filter_to_agent_map = workers.first.config[:filter_to_agent_map]
160
+      expect(filter_to_agent_map.keys).to eq(['keyword1', 'keyword2', 'agent2'])
161
+      expect(filter_to_agent_map['keyword1']).to eq([@agent])
162
+      expect(filter_to_agent_map['agent2']).to eq([agent2])
163
+    end
164
+  end
165
+
166
+  describe Agents::TwitterStreamAgent::Worker do
167
+    before(:each) do
168
+      @mock_agent = mock!
169
+      @config = {agent: @agent, config: {filter_to_agent_map: {'agent' => [@mock_agent]}}}
170
+      @worker = Agents::TwitterStreamAgent::Worker.new(@config)
171
+      @worker.setup
172
+    end
173
+
174
+    context "#run" do
175
+      it "calls the agent to process the tweet" do
176
+        stub.instance_of(IO).puts
177
+        mock(@mock_agent).name { 'mock' }
178
+        mock(@mock_agent).process_tweet('agent', {'text' => 'agent'})
179
+        mock(@worker).stream!(['agent'], @agent).yields({'text' => 'agent'})
180
+
181
+        @worker.run
182
+      end
183
+      it "skips retweets" do
184
+        mock.instance_of(IO).puts('Skipping retweet: retweet')
185
+        mock(@worker).stream!(['agent'], @agent).yields({'retweeted_status' => {'' => true}, 'text' => 'retweet'})
186
+
187
+        @worker.run
188
+      end
189
+
190
+      it "deduplicates tweets" do
191
+        mock.instance_of(IO).puts("dup")
192
+        mock.instance_of(IO).puts("Skipping duplicate tweet: dup")
193
+        # RR does not support multiple yield calls
194
+        class DoubleYield < Agents::TwitterStreamAgent::Worker
195
+          def stream!(_, __, &block)
196
+            yield({'text' => 'dup'})
197
+            yield({'text' => 'dup'})
198
+          end
199
+        end
200
+        worker = DoubleYield.new(@config)
201
+
202
+        worker.run
203
+      end
204
+    end
205
+
206
+    context "#stream!" do
207
+      before(:each) do
208
+        @client_mock = mock!
209
+        stub(@worker).client { @client_mock }
210
+      end
211
+
212
+      it "calls the sample method without filters" do
213
+        @client_mock.sample
214
+        @worker.send(:stream!, [], @mock_agent)
215
+      end
216
+
217
+      it "calls the filter method when filters are provided" do
218
+        @client_mock.filter(track: 'filter')
219
+        @worker.send(:stream!, ['filter'], @mock_agent)
220
+      end
221
+
222
+      it "only handles instances of Twitter::Tweet" do
223
+        @client_mock.sample.yields(Object.new)
224
+        expect { |blk| @worker.send(:stream!, [], @mock_agent, &blk) }.not_to yield_control
225
+      end
226
+
227
+      it "yields Hashes for received Twitter:Tweet instances" do
228
+        @client_mock.sample.yields(Twitter::Tweet.new(id: '1234', text: 'test'))
229
+        expect { |blk| @worker.send(:stream!, [], @mock_agent, &blk) }.to yield_with_args({'id' => '1234', 'text' => 'test'})
230
+      end
231
+
232
+      it "it backs of 60 seconds for every Twitter::Error::TooManyRequests exception rescued" do
233
+        stub.instance_of(IO).puts
234
+        mock(@worker).sleep(60)
235
+        @client_mock.sample { raise Twitter::Error::TooManyRequests }
236
+        @worker.send(:stream!, [], @mock_agent)
237
+        @client_mock.sample { raise Twitter::Error::TooManyRequests }
238
+        mock(@worker).sleep(120)
239
+        @worker.send(:stream!, [], @mock_agent)
240
+      end
241
+    end
242
+
243
+    context "#client" do
244
+      it "initializes the client" do
245
+        client = @worker.send(:client)
246
+        expect(client).to be_a(Twitter::Streaming::Client)
247
+        expect(client.access_token).to eq('1234token')
248
+        expect(client.access_token_secret).to eq('56789secret')
249
+      end
250
+    end
251
+  end
128 252
 end